1 module jupyter.wire.kernel;
2 
3 import jupyter.wire.message: Message;
4 
5 enum protocolVersion = "5.3.0";
6 
7 /**
8    So users don't have to write their own main
9  */
10 mixin template Main(Backend) {
11     int main(string[] args) {
12         import jupyter.wire.log: log;
13         try {
14             run!Backend(args);
15             return 0;
16         } catch(Exception e) {
17             log("Error: ", e.toString);
18             return 1;
19         } catch(Error e) {
20             log("FATAL ERROR: ", e.toString);
21             return 2;
22         }
23     }
24 }
25 
26 void run(Backend)(in string[] args) {
27     import jupyter.wire.kernel: Kernel;
28     import std.exception: enforce;
29 
30     const exeName = args.length > 0 ? args[0] : "<exeName>";
31     enforce(args.length == 2, "Usage: " ~ exeName ~ " <connectionFileName>");
32 
33     const connectionFileName = args[1];
34     auto backend = Backend();
35     auto k = kernel(backend, connectionFileName);
36     k.run;
37 }
38 
39 
40 struct LanguageInfo {
41     string name;
42     string version_;
43     string fileExtension;
44     string mimeType;
45 }
46 
47 
48 struct ExecutionResult {
49     string result;
50     string stdout;
51     string mime = "text/plain";
52 }
53 
54 
55 struct Stdout {
56     string value;
57 }
58 
59 alias IoPubMessageSender = void delegate(Message) @safe;
60 
61 ExecutionResult textResult(string result, Stdout stdout = Stdout("")) @safe pure nothrow {
62     return ExecutionResult(result, stdout.value, "text/plain");
63 }
64 
65 ExecutionResult markdownResult(string result, Stdout stdout = Stdout("")) @safe pure nothrow {
66     return ExecutionResult(result, stdout.value, "text/markdown");
67 }
68 
69 
70 template isBackend(T) {
71     enum isBackend = is(typeof({
72         LanguageInfo info = T.init.languageInfo;
73         scope IoPubMessageSender sender = (Message){};
74         ExecutionResult result = T.init.execute("foo", sender);
75     }));
76 }
77 
78 
79 auto kernel(Backend, Args...)(Backend backend, auto ref Args args) {
80     return Kernel!Backend(backend, args);
81 }
82 
83 
84 /**
85    Implements a generic Jupyter kernel.
86    Parameterised by a `Backend` type that knows how to execute code.
87 
88  */
89 struct Kernel(Backend) if(isBackend!Backend) {
90 
91     import jupyter.wire.connection: ConnectionInfo, Sockets;
92     import zmqd: Socket;
93     import std.typecons: Nullable;
94 
95     private Backend backend;
96     private Sockets sockets;
97     private int executionCount = 1;
98     private bool stop;
99 
100     this(Backend backend, in string connectionFileName)  {
101         import jupyter.wire.connection: fileNameToConnectionInfo;
102         this(backend, fileNameToConnectionInfo(connectionFileName));
103     }
104 
105     this(Backend backend, ConnectionInfo connectionInfo)  {
106         import std.traits : hasMember;
107         import jupyter.wire.log: log;
108 
109         log("Jupyter kernel starting with connection info ", connectionInfo);
110 
111         this.backend = backend;
112         this.sockets = Sockets(connectionInfo);
113         static if (hasMember!(Backend, "initialize"))
114             this.backend.initialize();
115     }
116 
117     void run()  {
118         import jupyter.wire.connection: recvRequestMessage;
119         import std.datetime: msecs;
120         import core.thread: Thread;
121 
122         for(;!stop;) {
123             maybeHandleRequestMessage(sockets.shell.recvRequestMessage);
124             maybeHandleRequestMessage(sockets.control.recvRequestMessage);
125             () @trusted { Thread.sleep(10.msecs); }();
126         }
127     }
128 
129     void maybeHandleRequestMessage(Nullable!Message requestMessage)  {
130         if(requestMessage.isNull) return;
131 
132         version(JupyterLogVerbose) {
133             import jupyter.wire.log: log;
134             log("Received message from the front-end.");
135         }
136 
137         handleRequestMessage(requestMessage.get);
138     }
139 
140     void handleRequestMessage(Message requestMessage)  {
141 
142         import jupyter.wire.message: statusMessage, pubMessage;
143         import jupyter.wire.log: log;
144         import std.json : JSONValue, parseJSON;
145 
146         version(JupyterLogVerbose) log("Sending busy message to the FE");
147         auto busyMsg = statusMessage(requestMessage.header, "busy");
148         sockets.send(sockets.ioPub, busyMsg);
149 
150         scope(exit) {
151             version(JupyterLogVerbose) log("Sending idle message to the FE");
152             auto idleMsg = statusMessage(requestMessage.header, "idle");
153             sockets.send(sockets.ioPub, idleMsg);
154         }
155 
156         switch(requestMessage.header.msgType) {
157 
158         default: return;
159 
160         case "complete_request":
161             handleCompleteRequest(requestMessage);
162             return;
163 
164         case "shutdown_request":
165             version(JupyterLogVerbose) log("Told by the FE to shutdown");
166             handleShutdown(requestMessage);
167             return;
168 
169         case "kernel_info_request":
170             version(JupyterLogVerbose) log("Asked by the FE to return kernel info");
171             handleKernelInfoRequest(requestMessage);
172             return;
173 
174         case "execute_request":
175             version(JupyterLogVerbose) log("Told by the FE to execute code");
176             handleExecuteRequest(requestMessage);
177             return;
178 
179         case "comm_open":
180             version(JupyterLogVerbose) log("Told by the FE to open a comm");
181             handleCommOpen(requestMessage);
182             return;
183 
184         case "comm_msg":
185             version(JupyterLogVerbose) log("Received a comm msg from the FE");
186             handleCommMessage(requestMessage);
187             return;
188 
189         case "comm_close":
190             version(JupyterLogVerbose) log("Told by the FE to close a comm");
191             handleCommClose(requestMessage);
192             return;
193         }
194 
195         assert(0);
196     }
197 
198     void handleCommOpen(Message requestMessage) {
199         import std.traits: hasMember;
200         import jupyter.wire.message: commCloseMessage;
201 
202         void closeComm() {
203             sockets.send(sockets.ioPub, commCloseMessage(requestMessage));
204         }
205 
206         static if (!hasMember!(Backend, "commOpen")) {
207             closeComm();
208         } else {
209             try {
210                 scope sender = (Message msg){
211                         msg.parentHeader = requestMessage.header;
212                         sockets.send(sockets.ioPub, msg);
213                     };
214 
215                 if (!backend.commOpen(requestMessage.content["comm_id"].str,
216                                       requestMessage.content["target_name"].str,
217                                       requestMessage.metadata,
218                                       requestMessage.content["data"],
219                                       sender))
220                     closeComm();
221             } catch (Exception e) {
222                 closeComm();
223                 throw e;
224             }
225         }
226     }
227 
228     void handleCommMessage(Message requestMessage) {
229         import std.traits: hasMember;
230 
231         static if (hasMember!(Backend, "commMessage")) {
232             scope sender = (Message msg){
233                     msg.parentHeader = requestMessage.header;
234                     sockets.send(sockets.ioPub, msg);
235                 };
236             backend.commMessage(requestMessage.content["comm_id"].str,
237                                 requestMessage.content["data"],
238                                 sender);
239         }
240     }
241 
242     void handleCommClose(Message requestMessage) {
243         import std.traits: hasMember;
244 
245         static if (hasMember!(Backend, "commClose")) {
246             scope sender = (Message msg){
247                     msg.parentHeader = requestMessage.header;
248                     sockets.send(sockets.ioPub, msg);
249                 };
250             backend.commClose(requestMessage.content["comm_id"].str,
251                               requestMessage.content["data"],
252                               sender);
253 
254         }
255     }
256 
257     void handleShutdown(Message requestMessage)  {
258         // TODO: restart
259         // The content of the request is just {"restart": bool} so we reuse it
260         // for the reply.
261         auto replyMessage = Message(requestMessage, "shutdown_reply", requestMessage.content);
262         sockets.send(sockets.control, replyMessage);
263         stop = true;
264     }
265 
266     void handleKernelInfoRequest(Message requestMessage)  {
267         import std.json: JSONValue;
268         import std.traits : hasMember;
269 
270         JSONValue[string] languageInfo;
271         languageInfo["name"] = backend.languageInfo.name;
272         languageInfo["version"] = backend.languageInfo.version_;
273         languageInfo["file_extension"] = backend.languageInfo.fileExtension;
274         static if (hasMember!(LanguageInfo,"mimeType"))
275             languageInfo["mimetype"] = backend.languageInfo.mimeType;
276 
277         JSONValue kernelInfo;
278         kernelInfo["status"] = "ok";
279         kernelInfo["protocol_version"] = protocolVersion;
280         kernelInfo["implementation"] = "foo";
281         kernelInfo["implementation_version"] = "0.0.1";
282         kernelInfo["language_info"] = languageInfo;
283         static if (hasMember!(Backend, "banner"))
284             kernelInfo["banner"] = backend.banner;
285 
286         auto replyMessage = Message(requestMessage, "kernel_info_reply", kernelInfo);
287         sockets.send(sockets.shell, replyMessage);
288     }
289 
290     void handleCompleteRequest(Message requestMessage) {
291         import jupyter.wire.message: completeMessage;
292         import std.traits: hasMember;
293 
294         static if (hasMember!(Backend, "complete")) {
295             const result = backend.complete(requestMessage.content["code"].str,
296                                             requestMessage.content["cursor_pos"].integer);
297             auto msg = completeMessage(requestMessage, result);
298             sockets.send(sockets.shell, msg);
299         }
300     }
301 
302     void handleExecuteRequest(Message requestMessage) {
303         import jupyter.wire.message: pubMessage;
304         import std.json: JSONValue, parseJSON, JSONType;
305         import std.conv: text;
306 
307         scope(exit) {
308             if(requestMessage.content["store_history"].type == JSONType.true_)
309             ++executionCount;
310         }
311 
312         {
313             JSONValue content;
314             content["execution_count"] = executionCount;
315             content["code"] = requestMessage.content["code"];
316             auto msg = pubMessage(requestMessage.header, "execute_input", content);
317             sockets.send(sockets.ioPub, msg);
318         }
319 
320         try {
321             scope sender = (Message msg){
322                     msg.parentHeader = requestMessage.header;
323                     sockets.send(sockets.ioPub, msg);
324                 };
325             const result = backend.execute(requestMessage.content["code"].str, sender);
326             sockets.stdout(requestMessage.header, result.stdout);
327 
328             {
329                 JSONValue content;
330                 content["execution_count"] = executionCount;
331                 content["data"] = JSONValue();
332                 content["data"][result.mime] = result.result;
333                 content["metadata"] = parseJSON(`{}`);
334                 sockets.publish(requestMessage.header, "execute_result", content);
335             }
336 
337             {
338                 JSONValue content;
339                 content["status"] = "ok";
340                 content["execution_count"] = executionCount;
341                 content["user_variables"] = parseJSON(`{}`);
342                 content["user_expressions"] = parseJSON(`{}`);
343                 content["payload"] = parseJSON(`[]`);
344                 auto replyMessage = Message(requestMessage, "execute_reply", content);
345                 sockets.send(sockets.shell, replyMessage);
346             }
347 
348         } catch(Exception e) {
349 
350             sockets.stdout(requestMessage.header, text("Error: ", e.msg));
351 
352             {
353                 JSONValue content;
354                 content["status"] = "error";
355                 content["execution_count"] = executionCount;
356                 content["ename"] = typeid(e).name;
357                 content["evalue"] = e.msg;
358                 content["traceback"] = text(e);
359 
360                 auto replyMessage = Message(requestMessage, "execute_reply", content);
361                 sockets.send(sockets.shell, replyMessage);
362             }
363         }
364     }
365 }